Skip to content

[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490

Open
JNSimba wants to merge 22 commits intoapache:masterfrom
JNSimba:improve_special_offset
Open

[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490
JNSimba wants to merge 22 commits intoapache:masterfrom
JNSimba:improve_special_offset

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 14, 2026

Summary

  • Support specifying offset (binlog position, LSN, named modes) when creating or altering a StreamingInsertJob via FROM MYSQL/POSTGRES path.
  • FE: extend DataSourceConfigValidator to validate offset formats (initial/snapshot/latest/earliest/JSON); earliest is MySQL-only, rejected for PostgreSQL. Implement JdbcSourceOffsetProvider.deserializeOffsetProperty() for named modes and JSON offset parsing. Remove S3-only restriction in StreamingInsertJob.initInsertJob() and modifyPropertiesInternal() so CDC jobs can also use offset property. On ALTER, sync offset to sourceProperties for the FROM...TO path.
  • BE: support JSON LSN offset {"lsn":"N"} in PostgresSourceReader.generatePostgresConfig(), and handle SPECIFIC_OFFSETS mode in JdbcIncrementalSourceReader.getStartOffsetFromConfig().
  • Fix JdbcOffset.isValidOffset() and toSerializedJson() to return meaningful values instead of hardcoded false/null.

Test plan

  • Regression test: test_streaming_mysql_job_special_offset — earliest/latest/JSON binlog offset with data sync verification, ALTER JOB offset change, invalid format rejection
  • Regression test: test_streaming_postgres_job_special_offset — initial/latest with data sync, ALTER JOB with JSON LSN offset and data sync verification, earliest rejection for PG, invalid format rejection
  • Regression test: test_streaming_mysql_job_special_offset_restart_fe — create job with JSON binlog offset, verify data sync, restart FE, verify job recovery and continued sync

🤖 Generated with Claude Code

…tJob create and alter

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 14, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

JNSimba and others added 2 commits April 14, 2026 19:14
… alter offset test

- CDC ALTER only allows JSON specific offset (reject named modes like initial/latest)
- ALTER offset uses PROPERTIES('offset'='{"file":"xxx","pos":"yyy"}') syntax
- Update regression cases to use PROPERTIES for ALTER
- Add cdc_stream TVF ALTER offset regression test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run buildall

…urceType().name()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba JNSimba requested a review from Copilot April 15, 2026 02:24
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

/review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends StreamingInsertJob CDC offset support so users can specify named offsets (initial/snapshot/latest/earliest) and JSON “specific offsets” when creating jobs via FROM MYSQL/POSTGRES, and JSON offsets when altering jobs (with validation and BE parsing support).

Changes:

  • FE: extend source config validation to accept additional offset modes and JSON offsets; wire validateSource to be data-source-type aware; allow CDC jobs to use offset property and restrict ALTER to JSON offsets for CDC.
  • FE: implement/adjust JDBC offset property deserialization and improve JdbcOffset behaviors (isValidOffset, toSerializedJson).
  • BE: add PostgreSQL JSON LSN offset support and handle SPECIFIC_OFFSETS startup mode.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy Adds TVF-path regression coverage for ALTER with JSON MySQL binlog offset.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy Adds PG regression coverage for create offsets + ALTER with JSON LSN, plus invalid cases.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy Adds FE-restart regression coverage for MySQL JSON binlog offset persistence.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy Adds MySQL regression coverage for earliest/latest/JSON offsets and ALTER behavior.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java Adds JSON LSN startup offset handling for PostgreSQL.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java Handles SPECIFIC_OFFSETS startup mode by constructing offset from config.
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java Passes source type into source config validation for offset-type-specific rules.
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java Passes data source type into source config validation during ALTER.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java Implements offset property deserialization for named modes and JSON.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java Adjusts offset validity and serialization behavior.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java Enables offset property validation beyond S3 and restricts CDC ALTER to JSON offsets.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java Adds data-source-type-aware offset validation and JSON detection helper.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java Outdated
Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 blocking issue.

  1. High - now serializes JDBC offsets in a different shape from what still reads. For binlog offsets the new JSON nests fields under , but the deserialize/replay path still expects a flattened list of string maps. persists this serialized value immediately after , so cloud replay / offset restore can no longer round-trip the altered CDC offset correctly.

Critical checkpoints:

  • Goal of task: Support special offsets on create/alter. Partially achieved; create/alter paths were extended and regression tests were added, but the new serialized offset format breaks persisted/replayed CDC offsets in cloud mode.
  • Modification size/focus: Focused overall.
  • Concurrency: No new concurrency concerns identified in the touched code paths.
  • Lifecycle/static init: No special lifecycle or static-init issues identified.
  • Configuration: No new configs added.
  • Compatibility/storage format: Blocking issue. The PR changes serialized JDBC offset shape without updating the corresponding deserialize/replay path.
  • Parallel code paths: Checked both and job paths; the persistence problem sits in shared JDBC offset serialization.
  • Special conditional checks: The new ALTER restriction to JSON offsets is understandable and commented.
  • Test coverage: New regression tests cover MySQL/Postgres create/alter and one FE-restart scenario, but they do not cover cloud-mode reset/replay of an altered JSON offset.
  • Observability: Existing logs seem adequate for this path.
  • Transaction/persistence: Problematic because offset reset/replay persists a format the current reader does not consume.
  • Data write/atomicity: No direct new write-atomicity issue beyond the persisted offset corruption risk.
  • FE-BE variable passing: No new FE-BE field propagation issues identified.
  • Performance: No material performance regressions identified in the reviewed diff.
  • Other issues: None beyond the blocking serialization mismatch.

…vider, fix serialization

- Add validateAlterOffset() to SourceOffsetProvider interface, CDC impl
  rejects non-JSON offset, S3 impl allows any format (default no-op)
- StreamingInsertJob delegates ALTER offset validation to provider
- AlterJobCommand.validateProps calls validateAlterOffset before validateOffset
- Restore S3-only guard in initInsertJob for PROPERTIES offset
- Fix JdbcOffset.toSerializedJson() to produce flat format compatible
  with deserializeOffset(): [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
- Remove unnecessary ts_usec padding in PostgresSourceReader JSON offset
- Remove instanceof check in modifyPropertiesInternal (validation moved to AlterJobCommand)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

/review

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

run buildall

…izeOffsetProperty

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 blocking correctness issues.

  1. FE accepts any JSON offset on ALTER for JDBC jobs. JdbcSourceOffsetProvider.validateAlterOffset() and deserializeOffsetProperty() only gate on isJsonOffset(), so a PostgreSQL job can accept MySQL-style JSON like {"file":"binlog.000001","pos":"154"} (and vice versa). ALTER succeeds, but the resumed job then fails in the broker because PostgresSourceReader.generatePostgresConfig() requires lsn.
  2. ALTER on the FROM MYSQL/POSTGRES ... TO path does not persist the new startup offset into sourceProperties. modifyPropertiesInternal() updates only the in-memory provider, while replay/reconstruction uses sourceProperties (for example in gsonPostProcess() and when creating the next multi-table task). If FE restarts before the first post-ALTER commit, the altered offset is lost and the job resumes from the old mode/offset.

Critical checkpoint conclusions:

  • Goal of current task: Partially accomplished. The PR adds special-offset support, but the ALTER path is still incorrect in the two scenarios above, so the feature is not correct end-to-end.
  • Modification size/clarity: Mostly focused, but the ALTER validation/persistence logic is split in a way that misses source-specific constraints and durable state updates.
  • Concurrency: No new lock-order or thread-safety issue found in the touched code.
  • Lifecycle/static initialization: No special lifecycle or static-init issue found.
  • Configuration items: No new config items added.
  • Compatibility: No incompatible wire/storage format change identified.
  • Parallel code paths: The CREATE path and downstream readers enforce source-specific semantics, but the ALTER path does not; that inconsistency is the root cause of one blocker.
  • Special conditional checks: The new JSON-only check is too broad and should be source-type-aware.
  • Test coverage: New regression tests cover many happy paths, but they do not cover invalid cross-source JSON on ALTER or FE restart before the first post-ALTER commit on the FROM ... TO path.
  • Observability: Existing logging looks sufficient to diagnose these failures.
  • Transaction/persistence: There is a persistence bug for altered offsets on the FROM ... TO path because replay reconstructs from stale sourceProperties.
  • Data writes/modifications: The restart case can resume consumption from the wrong offset, so data correctness is at risk.
  • FE/BE variable passing: No new propagation issue beyond the stale sourceProperties state above.
  • Performance: No material performance regression found in the touched code.
  • Other issues: None beyond the two blocking findings above.

return null;
}

@Override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only checks whether the value is JSON, not whether the JSON is valid for the current source type. As written, a PostgreSQL job will accept MySQL-style JSON like {"file":"binlog.000001","pos":"154"} here, because deserializeOffsetProperty() also wraps any JSON object in a BinlogSplit. The ALTER succeeds, but the resumed job later fails in PostgresSourceReader.generatePostgresConfig() because that path requires an lsn. Please validate the JSON shape against sourceType here (MySQL: file + pos; PostgreSQL: lsn) instead of treating every JSON object as valid.

if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
&& S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) {
Offset offset = validateOffset(inputStreamProps.getOffsetProperty());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For FROM MYSQL/POSTGRES ... TO jobs this only updates the in-memory provider. The durable state used by replay/reconstruction is still sourceProperties (gsonPostProcess() rebuilds new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties) from that map, and createStreamingMultiTblTask() also forwards it to BE). If FE restarts before the next successful commit, the altered offset is lost and the job resumes from the old mode/offset. Please also sync the new offset into sourceProperties for the non-TVF JDBC path before the ALTER is journaled.

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 12.50% (7/56) 🎉
Increment coverage report
Complete coverage report

JNSimba and others added 7 commits April 15, 2026 14:55
…tJob create and alter

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… alter offset test

- CDC ALTER only allows JSON specific offset (reject named modes like initial/latest)
- ALTER offset uses PROPERTIES('offset'='{"file":"xxx","pos":"yyy"}') syntax
- Update regression cases to use PROPERTIES for ALTER
- Add cdc_stream TVF ALTER offset regression test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…urceType().name()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…vider, fix serialization

- Add validateAlterOffset() to SourceOffsetProvider interface, CDC impl
  rejects non-JSON offset, S3 impl allows any format (default no-op)
- StreamingInsertJob delegates ALTER offset validation to provider
- AlterJobCommand.validateProps calls validateAlterOffset before validateOffset
- Restore S3-only guard in initInsertJob for PROPERTIES offset
- Fix JdbcOffset.toSerializedJson() to produce flat format compatible
  with deserializeOffset(): [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
- Remove unnecessary ts_usec padding in PostgresSourceReader JSON offset
- Remove instanceof check in modifyPropertiesInternal (validation moved to AlterJobCommand)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…izeOffsetProperty

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba JNSimba force-pushed the improve_special_offset branch from a543e1b to b7c7a15 Compare April 15, 2026 06:55
JNSimba and others added 2 commits April 15, 2026 16:45
…mprove test cases

- MySQL BE: supplement kind=SPECIFIC for ALTER offset in createBinlogSplit,
  add WARN log for fallback to config/earliest offset
- PG BE: supplement ts_usec=0 in createOffset for ALTER offset
- FE: sync offsetProviderPersist after ALTER offset update,
  remove debug logs from hasMoreDataToConsume
- Improve test cases: verify ALTER offset truly takes effect by checking
  data before/after the offset mark position
- Add golden files for qt_select assertions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  1. fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java: FE now accepts any JSON object as a JDBC offset, but the MySQL/PostgreSQL readers only support source-specific shapes (file+pos for MySQL, lsn for PostgreSQL). That means malformed JSON offsets can pass DDL-time validation and only fail later at runtime.
  2. fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java: ALTER JOB ... FROM MYSQL/POSTGRES (... 'offset'=...) is validated as if it were supported, but the altered source offset is never applied to offsetProvider.currentOffset, so the resumed job continues from the old position.

Critical checkpoint conclusions:

  • Goal of the task: Partially accomplished. The PR adds offset support for CDC streaming jobs, but the two issues above leave important alter/validation paths incorrect. Existing regression tests cover happy paths, but they do not prove malformed JSON shape rejection or the ALTER JOB ... FROM ... offset-reset behavior.
  • Small / clear / focused: Mostly yes. The change set is focused on CDC offset handling, FE validation, and regression coverage.
  • Concurrency: No new blocking concurrency issue found in the reviewed FE/CDC paths. The changes stay on existing job/task flows and do not introduce new lock ordering problems in the touched code.
  • Lifecycle / static initialization: No new special lifecycle or static initialization issue found.
  • Configuration items: No new config item added.
  • Compatibility / incompatible change: No storage-format or protocol compatibility break was identified, but FE/BE offset-shape validation is now inconsistent.
  • Parallel code paths: Not fully handled. The TVF PROPERTIES('offset'=...) path and the FROM MYSQL/POSTGRES alter path are not updated consistently.
  • Special conditional checks: Not sufficient. JSON-offset checks currently validate only “is object JSON”, not whether the shape is valid for the selected source.
  • Test coverage: Incomplete for the new behavior. Missing negative coverage for source-specific malformed JSON offsets and for ALTER JOB ... FROM MYSQL/POSTGRES (... 'offset'=...) actually taking effect.
  • Modified test results: The added .out files look consistent with the checked-in suites, but they do not cover the blocking cases above.
  • Observability: Existing logging looks adequate for these paths.
  • Transaction / persistence: No new edit-log format issue found, but the FROM ... alter path fails to update the provider state that actually drives subsequent consumption.
  • Data writes / modifications: Blocking issue present. A user can request an offset reset through the FROM MYSQL/POSTGRES alter syntax and the job can continue consuming from the previous offset instead.
  • FE/BE variable passing: The logical contract for offset format is incomplete; unsupported JSON shapes are allowed through FE and then rejected later by the CDC reader implementation.
  • Performance: No new blocking performance issue found.
  • Other issues: None beyond the two findings above.

Requesting changes because both findings can produce incorrect user-visible behavior for the new offset feature.

JNSimba and others added 5 commits April 15, 2026 19:28
… instead of 2

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… guide user to PROPERTIES

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… add test cases

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rease PG await timeout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 24.19% (15/62) 🎉
Increment coverage report
Complete coverage report

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 16, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 0.00% (0/62) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 14.52% (9/62) 🎉
Increment coverage report
Complete coverage report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants